package com.kafka.cn.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * @author: yangShen
 * @Description:
 * @Date: 2020/3/31 13:39
 */
public class CallBackProducer {
    /**
     * 未指定分区时：参数 ("aaa","atguigu----" + i)
     *   比如主题topic有3个分区，2个副本，批量发送10条数据，随机选的分区顺序为topic-0，topic-1，topic-2，则显示分区号---offset(消息长度，从0开始)，如下
     *     0---0
     *     0---1
     *     0---2
     *     0---3
     *     1---0
     *     1---1
     *     1---2
     *     2---0
     *     2---1
     *     2---2
     * @param args 参数
     */
    /**
     * 指定分区时：参数 ("aaa", 0,"atguigu","atguigu----" + i)
     *      比如主题topic有3个分区，2个副本，批量发送10条数据，指定0号分区，则显示分区号---offset(消息长度，从0开始,接着上面的0---3)，如下
     *     0---4
     *     0---5
     *     0---6
     *     0---7
     *     0---8
     *     0---9
     *     0---10
     *     0---11
     *     0---12
     *     0---13
     * @param args 参数
     */
    /**
     * 指定key 不指定分区 时：参数("aaa","atguigu","atguigu----" + i)
     *  分区是按照key的hash值来分区的，也可以自定义分区器
     * @param args
     */
    public static void main(String[] args) {
        //1.创建配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.16.26.16:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //3.发送数据，patition=0,则为0号分区，为Null,则交给Kafka自己区分配
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("aaa", 0,"atguigu","atguigu----" + i), (metadata, exception) -> {
                if (null == exception){
                    System.out.println(metadata.partition()+"---"+metadata.offset());
                }else {
                    exception.printStackTrace();
                }
            });
        }

        //4.关闭资源
        producer.close();
    }

    public static void main2(String[] args) {
        //1.创建配置信息
        Properties properties = new Properties();
        //2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("first", "atguigu----" + i), new Callback() {
                /**
                 * @param metadata  原始数据
                 * @param exception 异常信息
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (null == exception){
                        System.out.println(metadata.partition()+"---"+metadata.offset());
                    }
                }
            });
        }
    }
}
